#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2025 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!

"""
Usage:
    edl -h | --help
    edl [--vid=vid] [--pid=pid] [--portname=portname] [--serial]
    edl [--loader=filename] [--memory=memtype] [--portname=portname] [--serial]
    edl [--debugmode] [--portname=portname] [--serial]
    edl [--gpt-num-part-entries=number] [--gpt-part-entry-size=number] [--gpt-part-entry-start-lba=number] [--portname=portname] [--serial]
    edl [--memory=memtype] [--skipstorageinit] [--maxpayload=bytes] [--sectorsize==bytes] [--portname=portname] [--serial]
    edl server [--tcpport=portnumber] [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]  [--devicemodel=value]
    edl memorydump [--partitions=partnames] [--debugmode] [--vid=vid] [--pid=pid] [--portname=portname] [--serial] [--serial_number=serial_number]
    edl printgpt [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--loader=filename] [--debugmode]  [--skipresponse] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial] [--devicemodel=value]
    edl gpt <directory> [--memory=memtype] [--lun=lun] [--genxml] [--loader=filename]  [--skipresponse] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial] [--devicemodel=value]
    edl r <partitionname> <filename> [--memory=memtype] [--sectorsize==bytes] [--lun=lun] [--loader=filename]  [--skipresponse] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial] [--devicemodel=value]
    edl rl <directory> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--skip=partnames] [--genxml]  [--skipresponse] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial] [--devicemodel=value]
    edl rf <filename> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--loader=filename] [--debugmode]  [--skipresponse] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial] [--devicemodel=value]
    edl rs <start_sector> <sectors> <filename> [--lun=lun] [--sectorsize==bytes] [--memory=memtype] [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial] [--devicemodel=value]
    edl w <partitionname> <filename> [--partitionfilename=filename] [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--skipwrite] [--skipresponse] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--devicemodel=value] [--skipstorageinit] [--portname=portname] [--serial]
    edl wl <directory> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--skip=partnames] [--skipresponse] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--devicemodel=value] [--skipstorageinit] [--portname=portname] [--serial]
    edl wf <filename> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--loader=filename] [--skipresponse] [--debugmode] [--vid=vid] [--pid=pid] [--devicemodel=value] [--skipstorageinit] [--portname=portname] [--serial]
    edl ws <start_sector> <filename> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--skipwrite] [--skipresponse] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--devicemodel=value] [--skipstorageinit] [--portname=portname] [--serial]
    edl e <partitionname> [--memory=memtype] [--skipwrite] [--lun=lun] [--sectorsize==bytes] [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--devicemodel=value] [--skipstorageinit] [--portname=portname] [--serial]
    edl es <start_sector> <sectors> [--memory=memtype] [--lun=lun] [--sectorsize==bytes] [--skipwrite] [--loader=filename] [--skipresponse] [--debugmode] [--vid=vid] [--pid=pid] [--devicemodel=value] [--skipstorageinit] [--portname=portname] [--serial]
    edl ep <partitionname> <sectors> [--memory=memtype] [--skipwrite] [--lun=lun] [--sectorsize==bytes] [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--devicemodel=value] [--skipstorageinit] [--portname=portname] [--serial]
    edl footer <filename> [--memory=memtype] [--lun=lun] [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]  [--devicemodel=value]
    edl peek <offset> <length> <filename> [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--portname=portname] [--serial]
    edl peekhex <offset> <length> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--portname=portname] [--serial]
    edl peekdword <offset> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]
    edl peekqword <offset> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]
    edl memtbl <filename> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]
    edl poke <offset> <filename> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]
    edl pokehex <offset> <data> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]
    edl pokedword <offset> <data> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]
    edl pokeqword <offset> <data> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]
    edl memcpy <offset> <size> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--portname=portname] [--serial]
    edl secureboot [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]
    edl pbl <filename> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]
    edl qfp <filename> [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]
    edl getstorageinfo [--loader=filename] [--memory=memtype] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]  [--devicemodel=value]
    edl setbootablestoragedrive <lun> [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--portname=portname] [--serial]  [--devicemodel=value]
    edl getactiveslot [--memory=memtype] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--portname=portname] [--serial]  [--devicemodel=value]
    edl setactiveslot <slot> [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--portname=portname] [--serial]  [--devicemodel=value]
    edl send <command> [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--portname=portname] [--serial]  [--devicemodel=value]
    edl xml <xmlfile> [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]  [--devicemodel=value]
    edl rawxml <xmlstring> [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial]  [--devicemodel=value]
    edl reset [--resetmode=mode] [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--portname=portname] [--serial] [--devicemodel=value]
    edl nop [--loader=filename] [--debugmode] [--vid=vid] [--pid=pid] [--skipstorageinit] [--portname=portname] [--serial] [--devicemodel=value]
    edl modules <command> <options> [--memory=memtype] [--lun=lun] [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--devicemodel=value] [--portname=portname] [--serial]
    edl provision <xmlfile> [--loader=filename] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--portname=portname] [--serial]  [--devicemodel=value]
    edl qfil <rawprogram> <patch> <imagedir> [--loader=filename] [--memory=memtype] [--debugmode] [--skipresponse] [--vid=vid] [--pid=pid] [--portname=portname] [--serial]  [--devicemodel=value]

Description:
    server                      # Run tcp/ip server
    printgpt                    # Print GPT Table information
    gpt                         # Save gpt table to given directory
    r                           # Read flash to filename
    rl                          # Read all partitions from flash to a directory
    rf                          # Read whole flash to file
    rs                          # Read sectors starting at start_sector to filename
    w                           # Write filename to partition to flash
    wl                          # Write all files from directory to flash
    wf                          # Write whole filename to flash
    ws                          # Write filename to flash at start_sector
    e                           # Erase partition from flash
    es                          # Erase sectors at start_sector from flash
    ep                          # Erase sector count from flash partition
    footer                      # Read crypto footer from flash
    peek                        # Dump memory at offset with given length to filename
    peekhex                     # Dump memory at offset and given length
    peekdword                   # Dump DWORD at memory offset
    peekqword                   # Dump QWORD at memory offset
    memtbl                      # Dump memory table to file
    poke                        # Write filename to memory at offset to memory
    pokehex                     # Write hex string data at offset to memory
    pokedword                   # Write DWORD to memory at offset
    pokeqword                   # Write QWORD to memory at offset
    memcpy                      # Copy memory from srcoffset with given size to dstoffset
    secureboot                  # Print secureboot fields from qfprom fuses
    pbl                         # Dump primary bootloader to filename
    qfp                         # Dump QFPROM fuses to filename
    getstorageinfo              # Print storage info in firehose mode
    setbootablestoragedrive     # Change bootable storage drive to lun number
    send                        # Send firehose command
    xml                         # Send firehose xml file
    rawxml                      # Send firehose xml raw string
    reset                       # Send firehose reset command, reset modes: reset, off, edl
    nop                         # Send firehose nop command
    modules                     # Enable submodules, for example: "oemunlock enable"
    setactiveslot               # Set partition as active (Slot A/B)
    provision                   # UFS provision
    qfil                        # Write rawprogram xml files
                                # <rawprogram> : program config xml, such as rawprogram_unsparse.xml or rawprogram*.xml
                                # <patch> : patch config xml, such as patch0.xml or patch*.xml
                                # <imagedir> : directory name of image files

Options:
    --loader=filename                  Use specific EDL loader, disable autodetection [default: None]
    --vid=vid                          Set usb vendor id used for EDL [default: -1]
    --pid=pid                          Set usb product id used for EDL [default: -1]
    --lun=lun                          Set lun to read/write from (UFS memory only)
    --maxpayload=bytes                 Set the maximum payload for EDL [default: 0x100000]
    --sectorsize=bytes                 Set default sector size
    --memory=memtype                   Set memory type ("NAND", "eMMC", "UFS", "spinor")
    --partitionfilename=filename       Set partition table as filename for streaming mode
    --partitions=partnames             Skip reading partition with names != "partname1,partname2,etc."
    --skipwrite                        Do not allow any writes to flash (simulate only)
    --skipresponse                     Do not expect a response from phone on read/write (some Qualcomms)
    --skipstorageinit                  Skip storage initialisation
    --debugmode                        Enable verbose mode
    --gpt-num-part-entries=number      Set GPT entry count [default: 0]
    --gpt-part-entry-size=number       Set GPT entry size [default: 0]
    --gpt-part-entry-start-lba=number  Set GPT entry start lba sector [default: 0]
    --tcpport=portnumber               Set port for tcp server [default: 1340]
    --skip=partnames                   Skip reading partition with names "partname1,partname2,etc."
    --genxml                           Generate rawprogram[lun].xml
    --devicemodel=value                Set device model
    --portname=portname                Set serial port name (/dev/ttyUSB0 for Linux/MAC; \\\\.\\COM1 for Windows)
    --serial                           Use serial port (port autodetection)
    --slot                             Set active slot for setactiveslot [a or b]
    --resetmode=mode                   Resetmode for reset (poweroff, reset, edl, etc.)
"""

import logging
import os
import re
import subprocess
import sys
import time

from docopt import docopt

from edlclient.Config.usb_ids import default_ids
from edlclient.Library.Connection.seriallib import serial_class
from edlclient.Library.Connection.usblib import usb_class
from edlclient.Library.firehose_client import firehose_client
from edlclient.Library.sahara import sahara
from edlclient.Library.sahara_defs import cmd_t, sahara_mode_t
from edlclient.Library.streaming import Streaming
from edlclient.Library.streaming_client import streaming_client
from edlclient.Library.utils import LogBase
from edlclient.Library.utils import is_windows

args = docopt(__doc__, version='3')

print("Qualcomm Sahara / Firehose Client V3.62 (c) B.Kerler 2018-2025.")


class main(metaclass=LogBase):
    def __init__(self, args, imported: str = ''):
        self.serial = None
        self.portname = None
        self.__logger = self.__logger
        self.info = self.__logger.info
        self.debug = self.__logger.debug
        self.error = self.__logger.error
        self.warning = self.__logger.warning
        self.cdc = None
        self.sahara = None
        self.vid = None
        self.pid = None
        self.serial_number = None
        self.args = args
        self.fh = None

        if imported == '':
            self.imported = True
        else:
            self.imported = False

    def parse_cmd(self, rargs):
        if self.imported:
            return ""

        cmds = ["server", "printgpt", "gpt", "r", "rl", "rf", "rs", "w", "wl", "wf", "ws", "e", "es", "ep", "footer",
                "peek", "peekhex", "peekdword", "peekqword", "memtbl", "poke", "pokehex", "pokedword", "pokeqword",
                "memcpy", "secureboot", "pbl", "qfp", "getstorageinfo", "setbootablestoragedrive", "getactiveslot",
                "setactiveslot",
                "send", "xml", "rawxml", "reset", "nop", "modules", "memorydump", "provision", "qfil"]
        for cmd in cmds:
            if rargs[cmd]:
                return cmd
        return ""


    def console_cmd(self, cmd):
        read = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                                stderr=subprocess.STDOUT, close_fds=True)
        try:
            output = read.stdout.read().decode('utf-8')
        except:
            output = read.stdout.read().decode("ISO-8859-1")
        return output


    def parse_option(self, rargs):
        options = {}
        for arg in rargs:
            if "--" in arg or "<" in arg:
                options[arg] = rargs[arg]
        return options

    def doconnect(self, loop) -> dict:
        while not self.cdc.connected:
            self.cdc.connected = self.cdc.connect(portname=self.portname)
            if not self.cdc.connected:
                sys.stdout.write('.')
                if loop == 5:
                    sys.stdout.write('\n')
                    self.info("Hint:   Press and hold vol up+dwn, connect usb. For some, only use vol up.")
                    self.info("Xiaomi: Press and hold vol dwn + pwr, in fastboot mode connect usb.\n" +
                              "        Run \"./fastpwn oem edl\".")
                    self.info("Other:  Run \"adb reboot edl\".")
                    sys.stdout.write('\n')

                if loop >= 20:
                    sys.stdout.write('\n')
                    loop = 6
                loop += 1
                time.sleep(1)
                sys.stdout.flush()
            else:
                self.info("Device detected :)")
                try:
                    resp = self.sahara.connect()
                    self.vid = self.cdc.vid
                    self.pid = self.cdc.pid
                except Exception as err:  # pylint: disable=broad-except
                    self.debug(str(err))
                    continue
                if "mode" in resp:
                    mode = resp["mode"]
                    self.info(f"Mode detected: {mode}")
                    return resp
        return {"mode": "error"}

    def exit(self, status: int = 0, cdc_close: bool = True):
        if cdc_close:
            self.cdc.close()

        if self.imported:
            return status
        else:
            sys.exit(status)

    def run(self):
        if is_windows():
            proper_driver = self.console_cmd(r'reg query HKLM\HARDWARE\DEVICEMAP\SERIALCOMM')
            if re.findall(r'QCUSB', str(proper_driver)):
                self.warning(f'Please first install libusb_win32 driver from Zadig')

        loop = 0
        vid = int(self.args["--vid"], 16)
        pid = int(self.args["--pid"], 16)
        interface = -1
        if vid != -1 and pid != -1:
            portconfig = [[vid, pid, interface]]
        else:
            portconfig = default_ids
        if self.args["--debugmode"]:
            logfilename = "log.txt"
            if os.path.exists(logfilename):
                os.remove(logfilename)
            self.fh = logging.FileHandler(logfilename)
            self.__logger.addHandler(self.fh)
            self.__logger.setLevel(logging.DEBUG)
        else:
            self.__logger.setLevel(logging.INFO)

        if self.args["--serial"]:
            self.serial = True
        else:
            self.serial = False
        if self.args["--portname"]:
            self.portname = self.args["--portname"]
            self.serial = True
        else:
            self.portname = ""
        if self.serial:
            self.cdc = serial_class(loglevel=self.__logger.level, portconfig=portconfig)
        else:
            if self.args["--serial_number"]:
                self.serial_number = self.args["--serial_number"]
            self.cdc = usb_class(portconfig=portconfig, loglevel=self.__logger.level, serial_number=self.serial_number)

        self.sahara = sahara(self.cdc, loglevel=self.__logger.level)

        if self.args["--loader"] == 'None':
            self.info("Trying with no loader given ...")
            self.sahara.programmer = ""
        else:
            loader = self.args["--loader"]
            self.info(f"Using loader {loader} ...")
            self.sahara.programmer = loader

        self.info("Waiting for the device")
        self.cdc.timeout = 1500
        conninfo = self.doconnect(loop)
        mode = conninfo["mode"]
        try:
            version = conninfo.get("data").version
        except AttributeError:
            version = 2
        if mode == "sahara":
            cmd = conninfo["cmd"]
            if cmd == cmd_t.SAHARA_HELLO_REQ:
                if "data" in conninfo:
                    data = conninfo["data"]
                    if data.mode == sahara_mode_t.SAHARA_MODE_MEMORY_DEBUG:
                        if self.args["memorydump"] or self.cdc.pid == 0x900E:
                            time.sleep(0.5)
                            print("Device is in memory dump mode, dumping memory")
                            if self.args["--partitions"]:
                                self.sahara.debug_mode(self.args["--partitions"].split(","), version=version)
                            else:
                                self.sahara.debug_mode(version=version)
                            return self.exit()
                        else:
                            print("Device is in streaming mode, uploading loader")
                            self.cdc.timeout = None
                            sahara_info = self.sahara.streaminginfo()
                            if sahara_info:
                                sahara_connect = self.sahara.connect()
                                if len(sahara_connect) == 3:
                                    mode, cmd, resp = sahara_connect
                                else:
                                    mode, resp = sahara_connect
                                if mode == "sahara":
                                    mode = self.sahara.upload_loader(version=version)
                                    if "enprg" in self.sahara.programmer.lower():
                                        mode = "load_enandprg"
                                    elif "nprg" in self.sahara.programmer.lower():
                                        mode = "load_nandprg"
                                    elif mode != "":
                                        mode = "load_" + mode
                                    if "load_" in mode:
                                        time.sleep(0.3)
                                    else:
                                        print("Error, couldn't find suitable enprg/nprg loader :(")
                                        return self.exit()
                    else:
                        sahara_info = self.sahara.cmd_info(version=version)
                        if sahara_info is not None:
                            resp = self.sahara.connect()
                            mode = resp["mode"]
                            if "data" in resp:
                                data = resp["data"]
                            if mode == "sahara":
                                mode = self.sahara.upload_loader(version=version)
                        else:
                            print("Error on sahara handshake, resetting.")
                            self.sahara.cmd_reset()
                            return self.exit(1)
        else:
            if self.__logger.level != logging.DEBUG:
                self.__logger.setLevel(logging.ERROR)
        if mode == "error":
            print("Connection detected, quiting.")
            return self.exit(1)
        elif mode == "firehose":
            if "enprg" in self.sahara.programmer.lower():
                mode = "enandprg"
            elif "nprg" in self.sahara.programmer.lower():
                mode = "nandprg"
            if mode != "firehose":
                streaming = Streaming(self.cdc, self.sahara, self.__logger.level)
                if streaming.connect(1):
                    print("Successfully uploaded programmer :)")
                    mode = "nandprg"
                else:
                    print("No suitable loader found :(")
                    return self.exit()
        if mode != "firehose":
            sc = streaming_client(self.args, self.cdc, self.sahara, self.__logger.level, print)
            cmd = self.parse_cmd(self.args)
            options = self.parse_option(self.args)
            if "load_" in mode:
                options["<mode>"] = 1
            else:
                options["<mode>"] = 0
            sc.handle_streaming(cmd, options)
        else:
            self.cdc.timeout = None
            cmd = self.parse_cmd(self.args)
            if cmd == 'provision':
                self.args["--memory"] = 'ufs'
                self.args["--skipstorageinit"] = 1
            self.fh = firehose_client(self.args, self.cdc, self.sahara, self.__logger.level, print)
            options = self.parse_option(self.args)
            if cmd != "" or self.imported:
                self.info("Trying to connect to firehose loader ...")
                if self.fh.connect(sahara):
                    if self.imported:
                        return self.exit(cdc_close=False)
                    elif not self.fh.handle_firehose(cmd, options):
                        return self.exit(1)
                else:
                    return self.exit(1)


if __name__ == '__main__':
    base = main(args, __name__)
    base.run()
